<?php

namespace process;

use support\Redis;
use Workerman\Mqtt\Client;

class Mqtt
{
    public function onWorkerStart()
    {
        //订阅主题
        $mqtt = new Client('mqtt://127.0.0.1:1883');
        $mqtt->onConnect = function($mqtt){
            $topic_array=json_decode(Redis::get('topic'),true);
            if ($topic_array){
                $mqtt->subscribe($topic_array);
            }
        };
        $mqtt->onMessage = function($topic, $content){
            if (is_string($content)){
                if (json_decode($content)){
                    $content=json_decode($content,true);
                }
            }
            //消费主题类
            $classname = "app\\queue\\mqtt\\MqttConsumption";
            if (str_contains($topic, "/")){
                //包含斜杠是设备主题
                call_user_func_array([$classname,'Device'],[$topic,$content]);
            }else{
                call_user_func_array([$classname,$topic],[$content]);
            }
        };
        $mqtt->connect();
    }
}
